home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / access / heap / hio.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  10.4 KB  |  387 lines

  1. /*
  2.  * hio.c --
  3.  *    POSTGRES heap access method input/output code.
  4.  */
  5.  
  6. #include "tmp/c.h"
  7.  
  8. RcsId("$Header: /private/postgres/src/access/heap/RCS/hio.c,v 1.14 1992/03/05 00:36:21 hong Exp $");
  9.  
  10. #include "access/heapam.h"
  11. #include "access/hio.h"
  12. #include "access/htup.h"
  13.  
  14. #include "storage/block.h"
  15. #include "storage/buf.h"
  16. #include "storage/bufmgr.h"
  17. #include "storage/bufpage.h"
  18. #include "storage/itemid.h"
  19. #include "storage/itemptr.h"
  20. #include "storage/off.h"
  21.  
  22. #include "utils/memutils.h"
  23. #include "utils/log.h"
  24. #include "utils/rel.h"
  25.  
  26. /*
  27.  *    amputunique    - place tuple at tid
  28.  *
  29.  *    Currently on errors, calls elog.  Perhaps should return -1?
  30.  *    Possible errors include the addition of a tuple to the page
  31.  *    between the time the linep is chosen and the page is L_UP'd.
  32.  *
  33.  *    This should be coordinated with the B-tree code.
  34.  *    Probably needs to have an amdelunique to allow for
  35.  *    internal index records to be deleted and reordered as needed.
  36.  *    For the heap AM, this should never be needed.
  37.  */
  38. void
  39. RelationPutHeapTuple(relation, blockIndex, tuple)
  40.     Relation    relation;
  41.     BlockNumber    blockIndex;
  42.     HeapTuple    tuple;
  43. {
  44.     Buffer        buffer;
  45.     Page        pageHeader;
  46.     BlockNumber    numberOfBlocks;
  47.     OffsetIndex    offsetIndex;
  48.     unsigned    len;
  49.     ItemId        itemId;
  50.     Item        item;
  51.  
  52.     /* ----------------
  53.      *    increment access statistics
  54.      * ----------------
  55.      */
  56.     IncrHeapAccessStat(local_RelationPutHeapTuple);
  57.     IncrHeapAccessStat(global_RelationPutHeapTuple);
  58.             
  59.     Assert(RelationIsValid(relation));
  60.     Assert(HeapTupleIsValid(tuple));
  61.  
  62.     numberOfBlocks = RelationGetNumberOfBlocks(relation);
  63.     Assert(IndexIsInBounds(blockIndex, numberOfBlocks));
  64.  
  65.     buffer = ReadBuffer(relation, blockIndex);
  66. #ifndef NO_BUFFERISVALID
  67.     if (!BufferIsValid(buffer)) {
  68.         elog(WARN, "RelationPutHeapTuple: no buffer for %ld in %s",
  69.             blockIndex, &relation->rd_rel->relname);
  70.     }
  71. #endif
  72.  
  73.     pageHeader = LintCast(Page, BufferSimpleGetPage(buffer));
  74.     len = (unsigned)LONGALIGN(tuple->t_len);
  75.     Assert((int)len <= PageGetFreeSpace(pageHeader));
  76.  
  77.     offsetIndex = -1 + PageAddItem((Page)pageHeader, (Item)tuple,
  78.         tuple->t_len, InvalidOffsetNumber, LP_USED);
  79.  
  80.     itemId = PageGetItemId((Page)pageHeader, offsetIndex);
  81.     item = PageGetItem((Page)pageHeader, itemId);
  82.  
  83.     ItemPointerSimpleSet(&LintCast(HeapTuple, item)->t_ctid, blockIndex,
  84.         1 + offsetIndex);
  85.  
  86.     HeapTupleStoreRuleLock(LintCast(HeapTuple, item), buffer);
  87.  
  88.     WriteBuffer(buffer);
  89.     /* return an accurate tuple */
  90.     ItemPointerSimpleSet(&tuple->t_ctid, blockIndex, 1 + offsetIndex);
  91. }
  92.  
  93. /*
  94.  *    amputnew    - adds a tuple on a new page
  95.  *
  96.  *    Currently allows any sized tuples to be placed on a new
  97.  *    page.  Long tuples must be inserted via amputnew.
  98.  *
  99.  *    Eventually, should have ability to specify approximate location
  100.  *    for implementation of aggregates.    XXX
  101.  */
  102. /* TID    *   handle this properly XXX */
  103.  
  104. void
  105. RelationPutLongHeapTuple(relation, tuple)
  106.     Relation    relation;
  107.     HeapTuple    tuple;
  108. {
  109.     Buffer        b;
  110.     Buffer        headb;
  111.     PageHeader    dp;
  112.     int        off, taillen, tailtlen;
  113.     unsigned long    len;
  114.     ItemId        lpp;
  115.     char        *tp;
  116.     ItemContinuation    tcp, headtcp;
  117.     unsigned long    blocks;
  118.     BlockNumber    blockNumber;
  119.  
  120.     /* ----------------
  121.      *    increment access statistics
  122.      * ----------------
  123.      */
  124.     IncrHeapAccessStat(local_RelationPutLongHeapTuple);
  125.     IncrHeapAccessStat(global_RelationPutLongHeapTuple);
  126.     
  127.     Assert(RelationIsValid(relation));
  128.     Assert(HeapTupleIsValid(tuple));
  129.  
  130. /* correct validity checking here and elsewhere */
  131.     headb = ReadBuffer(relation, P_NEW);
  132. #ifndef NO_BUFFERISVALID
  133.     if (!BufferIsValid(headb)) {
  134.         elog(WARN,
  135.             "RelationPutLongHeapTuple: no new buffer for block #1");
  136.     }
  137. #endif
  138.     dp = LintCast(PageHeader, BufferSimpleGetPage(headb));
  139.     len = (unsigned long)LONGALIGN(tuple->t_len);
  140.     ItemPointerSimpleSet(&tuple->t_ctid, BufferGetBlockNumber(headb), 1);
  141.     if (len <= MAXTUPLEN) {
  142.         taillen = blocks = 0;
  143.     } else {
  144.         tailtlen = tuple->t_len - len;
  145.         if ((len -= MAXTUPLEN) <= MAXTCONTLEN) {
  146.             blocks = 0;
  147.             taillen = MAXTUPLEN;
  148.         } else {
  149.             taillen = MAXTUPLEN;
  150.             blocks = --len / MAXTCONTLEN;    /* XXX overflow */
  151.             len %= MAXTCONTLEN;
  152.             len++;
  153.         }
  154.         /* keep the header as small as possible but unfragmented */
  155.         if (len < tuple->t_hoff) {
  156.             taillen -= tuple->t_hoff - len;
  157.             len = tuple->t_hoff;    /* assumed long aligned */
  158.         }
  159.         tailtlen = taillen - tailtlen;
  160.     }
  161.     /*
  162.      *    At this point, len is the data length in the first
  163.      *    (header) page, taillen is the length of the last
  164.      *    (tail) page, and pages is the number of intervening
  165.      *    full data pages.
  166.      *
  167.      *    Now write the pages in order 1, N, N-1, N-2, ..., 2.
  168.      */
  169.     if (!taillen) {
  170.         off = BLCKSZ - len;
  171.         tp = (char *)dp + off;
  172.         len = tuple->t_len;
  173.         bcopy((char *)tuple, tp, (int)len);
  174.     } else {
  175.         off = BLCKSZ - len - TCONTPAGELEN;
  176.         headtcp = (ItemContinuation)((char *)dp + off);
  177.         bcopy((char *)tuple, headtcp->itemData, (int)len);
  178.         tp = (char *)tuple + len + MAXTCONTLEN * (int)blocks;
  179.         len += TCONTPAGELEN;
  180.         /*
  181.          *    At this point, tp points to the data in
  182.          *    the tail of the tuple and headtcp points
  183.          *    to the struct tcont space.
  184.          */    
  185.     }
  186.     BufferSimpleInitPage(headb);
  187.  
  188.     dp->pd_lower += sizeof *lpp;
  189.     dp->pd_upper = off;
  190. /*    PageGetMaxOffsetIndex(dp) += 1;
  191. */
  192.     lpp = dp->pd_linp;
  193.     (*lpp).lp_off = off;
  194.     (*lpp).lp_len = len;
  195.     if (!taillen)
  196.         (*lpp).lp_flags = LP_USED;
  197.     else {
  198.         (*lpp).lp_flags = LP_USED | LP_DOCNT;
  199.         b = ReadBuffer(relation, P_NEW);
  200.         if (BufferIsInvalid(b)) {
  201.             elog(WARN, "RelationPutLongHeapTuple: no new block #$");
  202.         }
  203.         dp = (PageHeader)BufferSimpleGetPage(b);
  204.         off = BLCKSZ - taillen;
  205.  
  206.         BufferSimpleInitPage(b);
  207.         dp->pd_lower += sizeof *lpp;
  208.         dp->pd_upper = off;
  209. /*        PageGetMaxOffsetIndex(dp) += 1;
  210. */
  211.  
  212.         lpp = dp->pd_linp;
  213.         (*lpp).lp_off = off;
  214.         (*lpp).lp_len = tailtlen;
  215.         (*lpp).lp_flags = LP_USED | LP_CTUP;
  216.         blockNumber = BufferGetBlockNumber(b);
  217.         bcopy(tp, (char *)dp + off, tailtlen);
  218.         if (BufferWriteInOrder(BufferGetBufferDescriptor(b),
  219.                        BufferGetBufferDescriptor(headb)) < 0)
  220.         {
  221.             elog(WARN,
  222.                 "RelationPutLongHeapTuple: no WriteInOrder #$");
  223.         }
  224.         WriteBuffer(b);
  225.         while (blocks--) {
  226.             b = ReadBuffer(relation, P_NEW);
  227. #ifndef NO_BUFFERISVALID
  228.             if (!BufferIsValid(b)) {
  229.                 elog(WARN,
  230.                     "RelationPutLongHeapTuple: no new #n");
  231.             }
  232. #endif
  233.             dp = (PageHeader)BufferSimpleGetPage(b);
  234.             tp -= MAXTCONTLEN;
  235.             tcp = (ItemContinuation)(dp + 1);
  236.             ItemPointerSet(&tcp->itemPointerData,
  237.                 SinglePagePartition, blockNumber, (PageNumber)0,
  238.                 (OffsetNumber)1);
  239. /*
  240.             BlockIdSet(tcp->tc_page, blockNumber);
  241. */
  242.             bcopy(tp, tcp->itemData, MAXTCONTLEN);
  243.  
  244.             BufferSimpleInitPage(b);
  245.             dp->pd_lower += sizeof *lpp;
  246.             dp->pd_upper = sizeof *dp;
  247. /*            PageGetMaxOffsetIndex(dp) += 1;
  248. */
  249.  
  250.             lpp = dp->pd_linp;
  251.             (*lpp).lp_off = sizeof *dp;
  252.             (*lpp).lp_len = MAXTUPLEN;    /* TCONTLEN + PAGELEN */
  253.             (*lpp).lp_flags = LP_USED | LP_DOCNT | LP_CTUP;
  254.             blockNumber = BufferGetBlockNumber(b);
  255.             if ( BufferWriteInOrder(
  256.                 BufferGetBufferDescriptor(b),
  257.                 BufferGetBufferDescriptor(headb)) < 0 )
  258.             {
  259.                 elog(WARN,
  260.                     "RelationPutLongHeapTuple: no WriteIn");
  261.             }
  262.             WriteBuffer(b);
  263.         }
  264.         ItemPointerSet(&headtcp->itemPointerData, SinglePagePartition,
  265.             blockNumber, (PageNumber)0, (OffsetNumber)1);
  266.         tp = &headtcp->itemData[0];
  267. /*
  268.         BlockIdSet(headtcp->tc_page, blockNumber);
  269. */
  270.     }
  271.  
  272.     HeapTupleStoreRuleLock((HeapTuple)tp, headb);
  273.  
  274.     WriteBuffer(headb);
  275.  
  276.     /* return(TIDFOR(blockNumber, 0)); */
  277. }
  278.  
  279. /*
  280.  * The heap_insert routines "know" that a buffer page is initialized to
  281.  * zero when a BlockExtend operation is performed. 
  282.  */
  283.  
  284. #define PageIsNew(page) ((page)->pd_upper == 0)
  285.  
  286. /*
  287.  * This routine is another in the series of attempts to reduce the number
  288.  * of I/O's and system calls executed in the various benchmarks.  In
  289.  * particular, this routine is used to append data to the end of a relation
  290.  * file without excessive lseeks.  This code should do no more than 2 semops
  291.  * in the ideal case.
  292.  *
  293.  * Eventually, we should cache the number of blocks in a relation somewhere.
  294.  * Until that time, this code will have to do an lseek to determine the number
  295.  * of blocks in a relation.
  296.  * 
  297.  * This code should ideally do at most 4 semops, 1 lseek, and possibly 1 write
  298.  * to do an append; it's possible to eliminate 2 of the semops if we do direct
  299.  * buffer stuff (!); the lseek and the write can go if we get
  300.  * RelationGetNumberOfBlocks to be useful.
  301.  *
  302.  * NOTE: This code presumes that we have a write lock on the relation.
  303.  *
  304.  * Also note that this routine probably shouldn't have to exist, and does
  305.  * screw up the call graph rather badly, but we are wasting so much time and
  306.  * system resources being massively general that we are losing badly in our
  307.  * performance benchmarks.
  308.  */
  309.  
  310. void
  311. RelationPutHeapTupleAtEnd(relation, tuple)
  312.  
  313. Relation relation;
  314. HeapTuple tuple;
  315.  
  316. {
  317.     Buffer        buffer;
  318.     Page        pageHeader;
  319.     BlockNumber    lastblock;
  320.     OffsetIndex    offsetIndex;
  321.     unsigned    len;
  322.     ItemId        itemId;
  323.     Item        item;
  324.  
  325.     Assert(RelationIsValid(relation));
  326.     Assert(HeapTupleIsValid(tuple));
  327.  
  328.     /*
  329.      * XXX This does an lseek - VERY expensive - but at the moment it
  330.      * is the only way to accurately determine how many blocks are in
  331.      * a relation.  A good optimization would be to get this to actually
  332.      * work properly.
  333.      */
  334.  
  335.     lastblock = RelationGetNumberOfBlocks(relation);
  336.  
  337.     if (lastblock == 0)
  338.     {
  339.         buffer = ReadBuffer(relation, lastblock);
  340.         pageHeader = LintCast(Page, BufferSimpleGetPage(buffer));
  341.         if (PageIsNew((PageHeader) pageHeader))
  342.         {
  343.             buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW);
  344.             pageHeader = LintCast(Page, BufferSimpleGetPage(buffer));
  345.             BufferSimpleInitPage(buffer);
  346.         }
  347.     }
  348.     else
  349.         buffer = ReadBuffer(relation, lastblock - 1);
  350.  
  351.     pageHeader = LintCast(Page, BufferSimpleGetPage(buffer));
  352.     len = (unsigned)LONGALIGN(tuple->t_len);
  353.  
  354.     /*
  355.      * Note that this is true if the above returned a bogus page, which
  356.      * it will do for a completely empty relation.
  357.      */
  358.  
  359.     if (len > PageGetFreeSpace(pageHeader))
  360.     {
  361.         buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW);
  362.         pageHeader = LintCast(Page, BufferSimpleGetPage(buffer));
  363.         BufferSimpleInitPage(buffer);
  364.  
  365.         if (len > PageGetFreeSpace(pageHeader))
  366.             elog(WARN, "Tuple is too big: size %d", len);
  367.     }
  368.  
  369.     offsetIndex = PageAddItem((Page)pageHeader, (Item)tuple,
  370.         tuple->t_len, InvalidOffsetNumber, LP_USED) - 1;
  371.  
  372.     itemId = PageGetItemId((Page)pageHeader, offsetIndex);
  373.     item = PageGetItem((Page)pageHeader, itemId);
  374.  
  375.     lastblock = BufferGetBlockNumber(buffer);
  376.  
  377.     ItemPointerSimpleSet(&LintCast(HeapTuple, item)->t_ctid,
  378.                  lastblock, 1 + offsetIndex);
  379.  
  380.     HeapTupleStoreRuleLock(LintCast(HeapTuple, item), buffer);
  381.  
  382.     /* return an accurate tuple */
  383.     ItemPointerSimpleSet(&tuple->t_ctid, lastblock, 1 + offsetIndex);
  384.  
  385.     WriteBuffer(buffer);
  386. }
  387.